用 Python 编写一个composition函数

Composition 函数(简称函数)是模板化 Crossplane 资源的自定义程序。 当你创建复合资源 (XR) 时,Crossplane 会调用 Composition 函数来决定它应该创建哪些资源。阅读 concepts 页面了解更多有关 Composition 函数的信息。

您可以使用通用编程语言为模板资源编写函数。 使用通用编程语言可以让函数对模板资源使用高级逻辑,如循环和条件。 本指南介绍如何在 Python 中编写 Composition 函数。

Important
在阅读本指南之前,最好先熟悉一下 Composition 功能的工作原理

了解步骤

本指南介绍为XBucketsComposition 资源 (XR) 的composition函数。

 1apiVersion: example.crossplane.io/v1
 2kind: XBuckets
 3metadata:
 4  name: example-buckets
 5spec:
 6  region: us-east-2
 7  names:
 8  - crossplane-functions-example-a
 9  - crossplane-functions-example-b
10  - crossplane-functions-example-c

一个 XBuckets XR 有一个区域和一个桶名数组。 该函数将为名称数组中的每个条目创建一个亚马逊网络服务(AWS)S3 桶。

用 Python 写一个函数

1.安装编写函数所需的工具 2.从模板初始化函数 3.编辑模板,添加函数逻辑 4.测试端到端函数 5.构建函数并将其推送到软件包注册库

本指南将详细介绍每个步骤。

安装编写函数所需的工具

要在 Python 中编写一个函数,您需要

Note
你不需要访问 Kubernetes 集群或 crossplane 控制平面,就能构建或测试 Composition 功能。

从模板初始化函数

使用 “crossplane beta xpkg init “命令初始化一个新函数。运行该命令时,它会以一个 GitHub 仓库为模板初始化你的函数。

1crossplane beta xpkg init function-xbuckets https://github.com/crossplane/function-template-python -d function-xbuckets
2Initialized package "function-xbuckets" in directory "/home/negz/control/negz/function-xbuckets" from https://github.com/crossplane/function-template-python/tree/bfed6923ab4c8e7adeed70f41138645fc7d38111 (main)

使用 crossplane beta init xpkg 命令会创建一个名为 function-xbuckets 的目录。 运行该命令后,新目录应如下所示:

1ls function-xbuckets
2Dockerfile example/  function/  LICENSE package/  pyproject.toml README.md renovate.json tests/

您的函数代码位于 function 目录中:

1ls function/
2__version__.py fn.py main.py

function/fn.py 文件是添加函数代码的地方。 了解模板中的其他一些文件很有用:

  • function/main.py 运行函数。你不需要编辑 main.py
  • Dockerfile 运行函数。不需要编辑 Dockerfile
  • package` 目录包含被引用用于构建函数包的元数据。
Tip

在 Crossplane CLI v1.14 中,“crossplane beta xpkg init “只是克隆了一个模板 GitHub 仓库。 未来的 CLI 发布将自动执行用新函数名称替换模板名称等任务。 详情请参见 Crossplane 问题 #4941

在开始添加代码之前,编辑 package/crossplane.yaml 更改软件包名称。 将软件包命名为 function-xbuckets

package/input 目录定义了函数输入的 OpenAPI 模式。 本指南中的函数不接受输入。 删除 package/input 目录。

composition函数](https://docs.crossplane.io/latest/concepts/composition-functions) 文档解释了composition函数的输入。

Tip

如果您正在编写一个被引用的函数,请编辑输入 YAML 文件以满足您的函数要求。

更改输入的种类和 API group。 不要使用 “Input “和 “template.fn.crossplane.io”,而应使用对函数有意义的名称。

编辑模板,添加函数逻辑

您可以在运行函数方法中添加您的函数逻辑。 首次打开该文件时,它包含一个 “hello world “函数。

 1async def RunFunction(self, req: fnv1beta1.RunFunctionRequest, _: grpc.aio.ServicerContext) -> fnv1beta1.RunFunctionResponse:
 2    log = self.log.bind(tag=req.meta.tag)
 3    log.info("Running function")
 4
 5    rsp = response.to(req)
 6
 7    example = ""
 8    if "example" in req.input:
 9        example = req.input["example"]
10
11    # TODO: Add your function logic here!
12    response.normal(rsp, f"I was run with input {example}!")
13    log.info("I was run!", input=example)
14
15    return rsp

所有 Python Composition 函数都有一个 “RunFunction “方法。 crossplane 会在一个RunFunctionRequest对象中传递函数运行所需的一切。

该函数通过返回一个运行函数响应对象。

编辑 RunFunction 方法,将其替换为以下代码。

 1async def RunFunction(self, req: fnv1beta1.RunFunctionRequest, _: grpc.aio.ServicerContext) -> fnv1beta1.RunFunctionResponse:
 2    log = self.log.bind(tag=req.meta.tag)
 3    log.info("Running function")
 4
 5    rsp = response.to(req)
 6
 7    region = req.observed.composite.resource["spec"]["region"]
 8    names = req.observed.composite.resource["spec"]["names"]
 9
10    for name in names:
11        rsp.desired.resources[f"xbuckets-{name}"].resource.update(
12            {
13                "apiVersion": "s3.aws.upbound.io/v1beta1",
14                "kind": "Bucket",
15                "metadata": {
16                    "annotations": {
17                        "crossplane.io/external-name": name,
18                    },
19                },
20                "spec": {
21                    "forProvider": {
22                        "region": region,
23                    },
24                },
25            }
26        )
27
28    log.info("Added desired buckets", region=region, count=len(names))
29
30    return rsp

展开下面的代码块以查看完整的 fn.py,包括导入和解释函数逻辑的注释。

 1"""A Crossplane composition function."""
 2
 3import grpc
 4from crossplane.function import logging, response
 5from crossplane.function.proto.v1beta1 import run_function_pb2 as fnv1beta1
 6from crossplane.function.proto.v1beta1 import run_function_pb2_grpc as grpcv1beta1
 7
 8class FunctionRunner(grpcv1beta1.FunctionRunnerService):
 9    """A FunctionRunner handles gRPC RunFunctionRequests."""
10
11    def __init__(self):
12        """Create a new FunctionRunner."""
13        self.log = logging.get_logger()
14
15    async def RunFunction(
16        self, req: fnv1beta1.RunFunctionRequest, _: grpc.aio.ServicerContext
17    ) -> fnv1beta1.RunFunctionResponse:
18        """Run the function."""
19        # Create a logger for this request.
20        log = self.log.bind(tag=req.meta.tag)
21        log.info("Running function")
22
23        # Create a response to the request. This copies the desired state and
24        # pipeline context from the request to the response.
25        rsp = response.to(req)
26
27        # Get the region and a list of bucket names from the observed composite
28        # resource (XR). Crossplane represents resources using the Struct
29        # well-known protobuf type. The Struct Python object can be accessed
30        # like a dictionary.
31        region = req.observed.composite.resource["spec"]["region"]
32        names = req.observed.composite.resource["spec"]["names"]
33
34        # Add a desired S3 bucket for each name.
35        for name in names:
36            # Crossplane represents desired composed resources using a protobuf
37            # map of messages. This works a little like a Python defaultdict.
38            # Instead of assigning to a new key in the dict-like map, you access
39            # the key and mutate its value as if it did exist.
40            #
41            # The below code works because accessing the xbuckets-{name} key
42            # automatically creates a new, empty fnv1beta1.Resource message. The
43            # Resource message has a resource field containing an empty Struct
44            # object that can be populated from a dictionary by calling update.
45            #
46            # https://protobuf.dev/reference/python/python-generated/#map-fields
47            rsp.desired.resources[f"xbuckets-{name}"].resource.update(
48                {
49                    "apiVersion": "s3.aws.upbound.io/v1beta1",
50                    "kind": "Bucket",
51                    "metadata": {
52                        "annotations": {
53                            "crossplane.io/external-name": name,
54                        },
55                    },
56                    "spec": {
57                        "forProvider": {
58                            "region": region,
59                        },
60                    },
61                }
62            )
63
64        # Log what the function did. This will only appear in the function's pod
65        # logs. A function can use response.normal() and response.warning() to
66        # emit Kubernetes events associated with the XR it's operating on.
67        log.info("Added desired buckets", region=region, count=len(names))
68
69        return rsp

此代码

1.从 RunFunctionRequest 获取观察到的 Composition 资源。 2.从观察到的 Composition 资源中获取区域和水桶名称。 3.为每个桶名添加一个所需的 S3 桶。 4.在 “RunFunctionResponse “中返回所需的 S3 存储桶。

crossplane 提供了一个 软件开发工具包 (SDK),用于用 Python 编写 Composition 函数。本函数被引用了 SDK 中的实用工具。

Tip
Important

Python SDK 会根据 Protocol Buffers 模式自动生成 RunFunctionRequestRunFunctionResponse Python 对象。您可以在 Buf Schema Registry 中查看该模式。

生成的 Python 对象的字段与内置的 Python 类型(如字典和列表)的行为类似。 请注意,它们之间存在一些差异。

值得注意的是,您可以像访问字典一样访问观察到的资源和所需资源的映射,但不能通过为映射键赋值来添加新的所需资源。 相反,访问和 mutation 映射键就好像它已经存在一样。

而不是像这样添加一个新资源:

1resource = {"apiVersion": "example.org/v1", "kind": "Composed", ...}
2rsp.desired.resources["new-resource"] = fnv1beta1.Resource(resource=resource)

假装它已经存在,然后对它进行 mutation,就像这样:

1resource = {"apiVersion": "example.org/v1", "kind": "Composed", ...}
2rsp.desired.resources["new-resource"].resource.update(resource)

更多详情,请参阅协议缓冲区 Python 生成代码指南

测试端到端功能

通过添加单元测试和被引用 “crossplane beta render “命令来测试你的函数。

当你从模板初始化一个函数时,它会在 tests/test_fn.py 中添加一些单元测试。这些测试使用 Python 标准库中的 unittest 模块。

要添加测试用例,请更新 test_run_function 中的 cases 列表。 展开下面的代码块,查看函数的完整 tests/test_fn.py 文件。

 1import dataclasses
 2import unittest
 3
 4from crossplane.function import logging, resource
 5from crossplane.function.proto.v1beta1 import run_function_pb2 as fnv1beta1
 6from google.protobuf import duration_pb2 as durationpb
 7from google.protobuf import json_format
 8from google.protobuf import struct_pb2 as structpb
 9
10from function import fn
11
12class TestFunctionRunner(unittest.IsolatedAsyncioTestCase):
13    def setUp(self) -> None:
14        logging.configure(level=logging.Level.DISABLED)
15        self.maxDiff = 2000
16
17    async def test_run_function(self) -> None:
18        @dataclasses.dataclass
19        class TestCase:
20            reason: str
21            req: fnv1beta1.RunFunctionRequest
22            want: fnv1beta1.RunFunctionResponse
23
24        cases = [
25            TestCase(
26                reason="The function should compose two S3 buckets.",
27                req=fnv1beta1.RunFunctionRequest(
28                    observed=fnv1beta1.State(
29                        composite=fnv1beta1.Resource(
30                            resource=resource.dict_to_struct(
31                                {
32                                    "apiVersion": "example.crossplane.io/v1alpha1",
33                                    "kind": "XBuckets",
34                                    "metadata": {"name": "test"},
35                                    "spec": {
36                                        "region": "us-east-2",
37                                        "names": ["test-bucket-a", "test-bucket-b"],
38                                    },
39                                }
40                            )
41                        )
42                    )
43                ),
44                want=fnv1beta1.RunFunctionResponse(
45                    meta=fnv1beta1.ResponseMeta(ttl=durationpb.Duration(seconds=60)),
46                    desired=fnv1beta1.State(
47                        resources={
48                            "xbuckets-test-bucket-a": fnv1beta1.Resource(
49                                resource=resource.dict_to_struct(
50                                    {
51                                        "apiVersion": "s3.aws.upbound.io/v1beta1",
52                                        "kind": "Bucket",
53                                        "metadata": {
54                                            "annotations": {
55                                                "crossplane.io/external-name": "test-bucket-a"
56                                            },
57                                        },
58                                        "spec": {
59                                            "forProvider": {"region": "us-east-2"}
60                                        },
61                                    }
62                                )
63                            ),
64                            "xbuckets-test-bucket-b": fnv1beta1.Resource(
65                                resource=resource.dict_to_struct(
66                                    {
67                                        "apiVersion": "s3.aws.upbound.io/v1beta1",
68                                        "kind": "Bucket",
69                                        "metadata": {
70                                            "annotations": {
71                                                "crossplane.io/external-name": "test-bucket-b"
72                                            },
73                                        },
74                                        "spec": {
75                                            "forProvider": {"region": "us-east-2"}
76                                        },
77                                    }
78                                )
79                            ),
80                        },
81                    ),
82                    context=structpb.Struct(),
83                ),
84            ),
85        ]
86
87        runner = fn.FunctionRunner()
88
89        for case in cases:
90            got = await runner.RunFunction(case.req, None)
91            self.assertEqual(
92                json_format.MessageToDict(got),
93                json_format.MessageToDict(case.want),
94                "-want, +got",
95            )
96
97if __name__ == "__main__":
98    unittest.main()

使用 hatch run 运行单元测试:

1hatch run test:unit
2.
3----------------------------------------------------------------------
4Ran 1 test in 0.003s
5
6OK
Tip
Hatch是一个 Python 构建工具。它可以构建类似 wheels 的 Python 构件。它还可以管理虚拟环境,类似于 virtualenvvenvhatch run 命令会创建一个虚拟环境,并在该环境中运行命令。

您可以使用 Crossplane CLI 预览被引用此功能的 Composition 的 Output,不需要使用 Crossplane 控制平面就能完成此操作。

function-xbuckets 下创建名为 example 的目录,并创建 Composite Resource、Composition 和 Function YAML 文件。

展开以下区块,查看示例文件。

您可以使用这些文件,通过运行 crossplane beta render 重现下面的输出结果。

XR.yaml` 文件包含要渲染的 Composition 资源:

 1apiVersion: example.crossplane.io/v1
 2kind: XBuckets
 3metadata:
 4  name: example-buckets
 5spec:
 6  region: us-east-2
 7  names:
 8  - crossplane-functions-example-a
 9  - crossplane-functions-example-b
10  - crossplane-functions-example-c

composition.yaml` 文件包含用于渲染复合资源的 Composition:

 1apiVersion: apiextensions.crossplane.io/v1
 2kind: Composition
 3metadata:
 4  name: create-buckets
 5spec:
 6  compositeTypeRef:
 7    apiVersion: example.crossplane.io/v1
 8    kind: XBuckets
 9  mode: Pipeline
10  pipeline:
11  - step: create-buckets
12    functionRef:
13      name: function-xbuckets

functions.yaml` 文件包含 Composition 在其 Pipelines 步骤中引用的函数:

 1apiVersion: pkg.crossplane.io/v1beta1
 2kind: Function
 3metadata:
 4  name: function-xbuckets
 5  annotations:
 6    render.crossplane.io/runtime: Development
 7spec:
 8  # The CLI ignores this package when using the Development runtime.
 9  # You can set it to any value.
10  package: xpkg.upbound.io/negz/function-xbuckets:v0.1.0

functions.yaml中的函数被引用为开发运行时。 这会告诉crossplane beta render` 您的函数正在本地运行。 它会连接到您本地运行的函数,而不是被引用 Docker 来拉动和运行函数。

1apiVersion: pkg.crossplane.io/v1beta1
2kind: Function
3metadata:
4  name: function-xbuckets
5  annotations:
6    render.crossplane.io/runtime: Development

使用 hatch run development 在本地运行您的函数。

1hatch run development
Warning
hatch run development 在不进行加密或身份验证的情况下运行函数。 仅在测试和开发过程中使用。

在另一个终端中,运行 crossplane beta render

1crossplane beta render xr.yaml composition.yaml functions.yaml

该命令调用你的函数。 在运行函数的终端中,现在应该可以看到 logging 输出:

1hatch run development
22024-01-11T22:12:58.153572Z [info     ] Running function filename=fn.py lineno=22 tag=
32024-01-11T22:12:58.153792Z [info     ] Added desired buckets count=3 filename=fn.py lineno=68 region=us-east-2 tag=

crossplane beta render` 命令会打印函数返回的所需资源。

 1---
 2apiVersion: example.crossplane.io/v1
 3kind: XBuckets
 4metadata:
 5  name: example-buckets
 6---
 7apiVersion: s3.aws.upbound.io/v1beta1
 8kind: Bucket
 9metadata:
10  annotations:
11    crossplane.io/composition-resource-name: xbuckets-crossplane-functions-example-b
12    crossplane.io/external-name: crossplane-functions-example-b
13  generateName: example-buckets-
14  labels:
15    crossplane.io/composite: example-buckets
16  ownerReferences:
17    # Omitted for brevity
18spec:
19  forProvider:
20    region: us-east-2
21---
22apiVersion: s3.aws.upbound.io/v1beta1
23kind: Bucket
24metadata:
25  annotations:
26    crossplane.io/composition-resource-name: xbuckets-crossplane-functions-example-c
27    crossplane.io/external-name: crossplane-functions-example-c
28  generateName: example-buckets-
29  labels:
30    crossplane.io/composite: example-buckets
31  ownerReferences:
32    # Omitted for brevity
33spec:
34  forProvider:
35    region: us-east-2
36---
37apiVersion: s3.aws.upbound.io/v1beta1
38kind: Bucket
39metadata:
40  annotations:
41    crossplane.io/composition-resource-name: xbuckets-crossplane-functions-example-a
42    crossplane.io/external-name: crossplane-functions-example-a
43  generateName: example-buckets-
44  labels:
45    crossplane.io/composite: example-buckets
46  ownerReferences:
47    # Omitted for brevity
48spec:
49  forProvider:
50    region: us-east-2
Tip
请阅读composition函数文档,了解有关 测试composition函数 的更多信息。

构建函数并将其推送至 packages 注册表

构建函数分为两个阶段: 首先是构建函数的运行时,这是 Crossplane 用来运行函数的开放容器倡议(OCI)镜像。 然后将运行时嵌入软件包,并将其推送到软件包注册中心。 Crossplane CLI 将 xpkg.upbound.io 作为默认的软件包注册中心。

一个函数默认支持单个平台,如 “linux/amd64”,您可以为每个平台构建运行时和软件包,然后将所有软件包推送到注册表中的单个标签,从而支持多个平台。

将您的函数推送到 registry,就可以在 crossplane 控制平面中使用您的函数。请参阅Composition functions documentation。了解如何在控制平面中使用函数。

使用 docker 为每个平台构建运行时。

1docker build . --quiet --platform=linux/amd64 --tag runtime-amd64
2sha256:fdf40374cc6f0b46191499fbc1dbbb05ddb76aca854f69f2912e580cfe624b4b
1docker build . --quiet --platform=linux/arm64 --tag runtime-arm64
2sha256:cb015ceabf46d2a55ccaeebb11db5659a2fb5e93de36713364efcf6d699069af
Tip
您可以使用任何标签,无需将运行时镜像推送到 registry。 标签只是用来告诉 crossplane xpkg build 嵌入什么运行时。
Important
Docker 使用仿真技术为不同平台创建镜像。 如果为不同平台构建镜像失败,请确保已安装 binfmt。有关说明,请参阅 Docker 文档

使用 Crossplane CLI 为每个平台构建一个软件包。 每个软件包都嵌入了一个运行时镜像。

……。 --package-rootflag 指定了包含 crossplane.yamlpackage 目录。 其中包括软件包的元数据。

……。 --嵌入运行时镜像flag 指定了被引用 Docker 构建的运行时镜像标签。

--package-file标志指定将软件包文件写入磁盘的位置。 crossplane 软件包文件的扩展名为 .xpkg

1crossplane xpkg build \
2    --package-root=package \
3    --embed-runtime-image=runtime-amd64 \
4    --package-file=function-amd64.xpkg
1crossplane xpkg build \
2    --package-root=package \
3    --embed-runtime-image=runtime-arm64 \
4    --package-file=function-arm64.xpkg
Tip
crossplane 软件包是特殊的 OCI 镜像。请在【软件包文档】(https://docs.crossplane.io/latest/concepts/packages) 中阅读有关软件包的更多信息。

将两个软件包文件都推送到注册表中。将两个文件都推送到注册表中的一个标签,就能创建一个多平台 软件包,在 linux/arm64linux/amd64 主机上都能运行。

1crossplane xpkg push \
2  --package-files=function-amd64.xpkg,function-arm64.xpkg \
3  negz/function-xbuckets:v0.1.0
Tip

如果您将函数推送到 GitHub 仓库,模板会使用 GitHub Actions 自动设置持续集成 (CI)。CI 工作流将对您的函数进行校验、测试和构建。您可以通过阅读 .github/workflows/ci.yaml,查看模板是如何配置 CI 的。

CI 工作流可以自动将软件包推送到 xpkg.upbound.io。要做到这一点,您必须在 https://marketplace.upbound.io 创建一个版本库。通过创建一个 API 令牌并将其添加到您的版本库,赋予 CI 工作流向市场推送的权限。将您的 API 令牌访问 ID 保存为名为 XPKG_ACCESS_ID 的secret,并将您的 API 令牌保存为名为 XPKG_TOKEN 的secret。